{
  "metadata" : {
    "id" : "f55628a5-d546-45fa-b827-56d8e0317801",
    "name" : "occupancy_detection_model",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null,
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "F7BA3781875B40E98F4E3031A06C4EA4"
    },
    "cell_type" : "markdown",
    "source" : "# Inference of Room Occupancy using Environment Sensors\n## Part I. Training a Model"
  }, {
    "metadata" : {
      "id" : "A4B2A3EAD73342F89B3D44BA0FFF8C2C"
    },
    "cell_type" : "markdown",
    "source" : "In this notebook, we are going to explore the use of machine learning techniques \nto estimate the occupance of rooms based on environment sensors present in the rooms.\n\nFor this exercise we are going to use the dataset available at: https://archive.ics.uci.edu/ml/datasets/Occupancy+Detection+\n\nFirst, we will to build a model that learns the relations between the environment factors and the occupancy state by using a dataset of labeled observations.\nThis is also known as _supervised learning_.\n\nOur dataset consists of timestamped records with the _humidity_, _light levels_, _temperature_ and _CO2 levels_. \nOur training dataset also contains a label that indicates whether the room was occupied or not at the moment of the measurements."
  }, {
    "metadata" : {
      "id" : "BE9D03E4D6474A7C842A164ECE2944B5"
    },
    "cell_type" : "markdown",
    "source" : "### Loading and Parsing the Data"
  }, {
    "metadata" : {
      "id" : "B02DCE024FC4489691085544F9435DBA"
    },
    "cell_type" : "markdown",
    "source" : "In preparation to use this notebook, download the zip file containing the dataset to a local folder in your machine. \n\nWe will call this folder `dataDir` in the notebook."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "007E35315190479B84C2C284D4A9EFE8"
    },
    "cell_type" : "code",
    "source" : [ "val baseDir = \"/tmp\"  // Change this to an appropriate location\n", "val dataDir = s\"$baseDir/data\"\n", "val modelDir = s\"$baseDir/model\"\n", "val modelFile = s\"$modelDir/occupancy-lg.model\"" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "baseDir: String = /tmp\ndataDir: String = /tmp/data\nmodelDir: String = /tmp/model\nmodelFile: String = /tmp/model/occupancy-lg.model\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 1,
      "time" : "Took: 1.209s, at 2018-08-29 14:43"
    } ]
  }, {
    "metadata" : {
      "id" : "6466825D76E54E8281F5D1F7C3DEEA22"
    },
    "cell_type" : "markdown",
    "source" : "By observing the first lines of the data, we can appreciate that it is in CSV format and has a header. We can use the CSV reader to load the data.\n```\n\"id\",\"date\",\"Temperature\",\"Humidity\",\"Light\",\"CO2\",\"HumidityRatio\",\"Occupancy\"\n\"1\",\"2015-02-04 17:51:00\",23.18,27.272,426,721.25,0.00479298817650529,1\n```\n(Note that the original dataset misses the \"id\" field in the header. To make the process easier, edit the file to add `<\"id\",>` at the beginning)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "133729DDD9CA44D98B2406F157B10BCD"
    },
    "cell_type" : "code",
    "source" : [ "val sensorData = sparkSession.read\n", "        .option(\"header\",true)\n", "        .option(\"inferSchema\", true)\n", "        .csv(s\"$dataDir/datatraining.txt\")" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "sensorData: org.apache.spark.sql.DataFrame = [id: int, date: timestamp ... 6 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 1,
      "time" : "Took: 1.044s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "23F385965316440F8CE52ECB7F7D4953"
    },
    "cell_type" : "code",
    "source" : [ "// check that the inferred schema corresponds to the expected types\n", "sensorData.printSchema" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "root\n |-- id: integer (nullable = true)\n |-- date: timestamp (nullable = true)\n |-- Temperature: double (nullable = true)\n |-- Humidity: double (nullable = true)\n |-- Light: double (nullable = true)\n |-- CO2: double (nullable = true)\n |-- HumidityRatio: double (nullable = true)\n |-- Occupancy: integer (nullable = true)\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 2,
      "time" : "Took: 0.962s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "id" : "D3B58EDDA35C494D92EF75DB1C1D5A67"
    },
    "cell_type" : "markdown",
    "source" : "## Building a Logistic Regression Model"
  }, {
    "metadata" : {
      "id" : "B1425DB6CAC946759EEDEE4EBFA4A1D2"
    },
    "cell_type" : "markdown",
    "source" : "To train our model, we are going to build a ML Pipeline. "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "B4F822F660E24517878AE2C1007B1EB1"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.ml.Pipeline\n", "import org.apache.spark.ml.classification.LogisticRegression\n", "val lr = new LogisticRegression()\n", "  .setMaxIter(10)\n", "  .setRegParam(0.1)\n", "  .setElasticNetParam(0.8)\n", "  \n" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import org.apache.spark.ml.Pipeline\nimport org.apache.spark.ml.classification.LogisticRegression\nlr: org.apache.spark.ml.classification.LogisticRegression = logreg_9b02575b4033\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 3,
      "time" : "Took: 0.721s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "D7BB26D91E7F45AB83800603786814A5"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.ml.feature.VectorAssembler\n", "val assembler = new VectorAssembler()\n", "    .setInputCols(Array(\"Temperature\", \"Humidity\", \"Light\", \"CO2\", \"HumidityRatio\"))\n", "//.setInputCols(Array(\"Temperature\", \"Humidity\", \"Light\", \"CO2\", \"HumidityRatio\"))\n", "    .setOutputCol(\"features\")" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import org.apache.spark.ml.feature.VectorAssembler\nassembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_530fd58b21e5\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 4,
      "time" : "Took: 0.671s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "6640A8AE3C2B47CAA7A4B20DFA64F16C"
    },
    "cell_type" : "code",
    "source" : [ "val labeledData = sensorData.withColumn(\"label\", $\"Occupancy\".cast(\"Double\"))" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "labeledData: org.apache.spark.sql.DataFrame = [id: int, date: timestamp ... 7 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 5,
      "time" : "Took: 0.680s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "91E8109D6FCE43A58F7C67980E409D01"
    },
    "cell_type" : "code",
    "source" : [ "labeledData.printSchema" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "root\n |-- id: integer (nullable = true)\n |-- date: timestamp (nullable = true)\n |-- Temperature: double (nullable = true)\n |-- Humidity: double (nullable = true)\n |-- Light: double (nullable = true)\n |-- CO2: double (nullable = true)\n |-- HumidityRatio: double (nullable = true)\n |-- Occupancy: integer (nullable = true)\n |-- label: double (nullable = true)\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 6,
      "time" : "Took: 0.804s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "id" : "862B5C38FD0D4FAAACEC27D580DE5A36"
    },
    "cell_type" : "markdown",
    "source" : "We define the Pipeline as a sequence of stages. \nIn our case, the assembler, which brings the features together into a `Vector` and the parameterized _Logistic Regression_ `Estimator` that we instantiated earlier."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "EC24EE52015D490EB6C6FB84AF16E73E"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.ml.Pipeline\n", "val pipeline = new Pipeline().setStages(Array(assembler, lr))" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import org.apache.spark.ml.Pipeline\npipeline: org.apache.spark.ml.Pipeline = pipeline_026fbfe8ca4c\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 7,
      "time" : "Took: 0.769s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "id" : "79C81BC30E6340B8A51CFE994EEF54E6"
    },
    "cell_type" : "markdown",
    "source" : "The `fit` method in a `Pipeline` lets us train the model on a dataset and produces\na `model` that we can use to make predictions on new data."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "025AA6212956453F8A152C6F84A8BA26"
    },
    "cell_type" : "code",
    "source" : [ "val model = pipeline.fit(labeledData)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "model: org.apache.spark.ml.PipelineModel = pipeline_026fbfe8ca4c\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 8,
      "time" : "Took: 2.919s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "id" : "B9EF97F9597949848CA59694AD6CC0D4"
    },
    "cell_type" : "markdown",
    "source" : "## Validating the Model"
  }, {
    "metadata" : {
      "id" : "E79AFD7A9F9A47FB920997F66618C6A8"
    },
    "cell_type" : "markdown",
    "source" : "To validate the model, we use data for which we know the expected outcome.\n\nThat way, we can compare the real with the predicted value and evaluate how well our model is performing."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "BD841317241C49EE89AF15B50F4D272B"
    },
    "cell_type" : "code",
    "source" : [ "val testData = sparkSession.read\n", "        .option(\"header\",true)\n", "        .option(\"inferSchema\", true)\n", "        .csv(s\"$dataDir/datatest.txt\")\n", "        .withColumn(\"label\", $\"Occupancy\".cast(\"Double\"))" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "testData: org.apache.spark.sql.DataFrame = [id: int, date: timestamp ... 7 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 9,
      "time" : "Took: 0.931s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "440B4A22231341DFAB245B14BCD26B3C"
    },
    "cell_type" : "code",
    "source" : [ "val predictions = model.transform(testData)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "predictions: org.apache.spark.sql.DataFrame = [id: int, date: timestamp ... 11 more fields]\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 10,
      "time" : "Took: 0.713s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A27E17D4A1104629877D5B98B562F75B"
    },
    "cell_type" : "code",
    "source" : [ "predictions.select($\"Occupancy\", $\"rawPrediction\",$\"probability\", $\"prediction\").show(10, truncate=false )" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "+---------+----------------------------------------+----------------------------------------+----------+\n|Occupancy|rawPrediction                           |probability                             |prediction|\n+---------+----------------------------------------+----------------------------------------+----------+\n|1        |[-1.165992741474785,1.165992741474785]  |[0.2375800787231101,0.76241992127689]   |1.0       |\n|1        |[-1.134749462216978,1.134749462216978]  |[0.24328566985645464,0.7567143301435453]|1.0       |\n|1        |[-1.1084831249923008,1.1084831249923008]|[0.24815378909083458,0.7518462109091655]|1.0       |\n|1        |[-0.5854424488314132,0.5854424488314132]|[0.35768125007101514,0.6423187499289849]|1.0       |\n|1        |[-0.5536855193052468,0.5536855193052468]|[0.3650097638103235,0.6349902361896765] |1.0       |\n|1        |[-1.1082268289859558,1.1082268289859558]|[0.24820161021665613,0.7517983897833439]|1.0       |\n|1        |[-0.9054518379194303,0.9054518379194303]|[0.2879314327729891,0.712068567227011]  |1.0       |\n|1        |[-0.7174372358135888,0.7174372358135888]|[0.3279575705150612,0.6720424294849388] |1.0       |\n|1        |[-0.5043690269606838,0.5043690269606838]|[0.37651448191486914,0.6234855180851309]|1.0       |\n|1        |[-0.7436762950278142,0.7436762950278142]|[0.32220076291467276,0.6777992370853272]|1.0       |\n+---------+----------------------------------------+----------------------------------------+----------+\nonly showing top 10 rows\n\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 11,
      "time" : "Took: 1.169s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "id" : "6F60600DC69541088F3C957C97672F0A"
    },
    "cell_type" : "markdown",
    "source" : "### Model Evaluation"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "3E955E8AF73C45E8B345333233247091"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator\n", "val evaluator = new BinaryClassificationEvaluator()\n", "        .setLabelCol(\"label\")\n", "        .setRawPredictionCol(\"rawPrediction\")\n", "        .setMetricName(\"areaUnderROC\")\n", "// Evaluates predictions and returns AUC (Area Under ROC Curve - larger is better, 1 is perfect).\n", "val accuracy = evaluator.evaluate(predictions)" ],
    "outputs" : [ {
      "name" : "stdout",
      "output_type" : "stream",
      "text" : "import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator\nevaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_4164a6bf8fcc\naccuracy: Double = 0.9917154635767224\n"
    }, {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 12,
      "time" : "Took: 1.289s, at 2018-08-29 14:45"
    } ]
  }, {
    "metadata" : {
      "id" : "39829315F4684BB78F2BD76FA0B5FEAF"
    },
    "cell_type" : "markdown",
    "source" : "## Store the model for later use\nWe store the trained model on disk.\nIt can be read back from disk and applied at a later stage."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "A6CCE6F6B7144B2B8889DE735ECDF5B5"
    },
    "cell_type" : "code",
    "source" : [ "model.write.overwrite.save(modelFile)" ],
    "outputs" : [ {
      "metadata" : { },
      "data" : {
        "text/html" : ""
      },
      "output_type" : "execute_result",
      "execution_count" : 13,
      "time" : "Took: 2.136s, at 2018-08-29 14:46"
    } ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "09BCE508408A43D1873E3259DCC1167F"
    },
    "cell_type" : "code",
    "source" : [ "" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}